package cn.doitedu.rtdw.data_etl;

import beans.EventBean;
import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;

/**
 * @Author: deep as the sea
 * @Site: <a href="www.51doit.com">多易教育</a>
 * @QQ: 657270652
 * @Date: 2023/2/5
 * @Desc: 学大数据，到多易教育
 * 视频播放时长分析轻度聚合计算代码
 **/
public class E05_EtlJob_VideoAgg01 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("file:/d:/checkpoint");
        env.setParallelism(1);

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 连接kafka，读取用户行为明细日志
        // 读 kafka中的日志明细数据
        tEnv.executeSql(
                " CREATE TABLE mall_events_commondim_kfksource(         "
                        + "     user_id           INT,                           "
                        + "     event_id          string,                        "
                        + "     event_time        bigint,                        "
                        + "     release_channel   string,                        "
                        + "     device_type       string,                        "
                        + "     properties        map<string,string>,            "
                        + "     pt  AS proctime()                                "
                        + " ) WITH (                                             "
                        + "  'connector' = 'kafka',                              "
                        + "  'topic' = 'mall-events-wide',                       "
                        + "  'properties.bootstrap.servers' = 'doitedu:9092',    "
                        + "  'properties.group.id' = 'testGroup',                "
                        + "  'scan.startup.mode' = 'latest-offset',            "
                        + "  'value.format'='json',                              "
                        + "  'value.json.fail-on-missing-field'='false',         "
                        + "  'value.fields-include' = 'EXCEPT_KEY')              ");

        // 创建hbase中的视频信息维表的 逻辑映射表
        tEnv.executeSql(
                "CREATE TABLE cms_video_hbase( " +
                        " id INT, " +
                        " f ROW<video_name STRING, video_type STRING, video_album STRING, " +
                        "video_author STRING, video_timelong INT,create_time TIMESTAMP(3), update_time TIMESTAMP(3)>, " +
                        " PRIMARY KEY (id) NOT ENFORCED " +
                        ") WITH (                             " +
                        " 'connector' = 'hbase-2.2',          " +
                        " 'table-name' = 'dim_video_info',     " +
                        " 'zookeeper.quorum' = 'doitedu:2181' " +
                        ")");

        // 对 日志  和 视频信息  做 lookup关联
        tEnv.executeSql(
                " CREATE TEMPORARY VIEW tmp AS                               "
                        + "  WITH play_log AS (                                        "
                        + "    SELECT                                                  "
                        + "      user_id,                                              "
                        + "      event_id,                                             "
                        + "      event_time,                                           "
                        + "      release_channel,                                      "
                        + "      device_type,                                          "
                        + "      cast(properties['video_id'] as int) as video_id,      "
                        + "      properties['play_id'] as video_play_id,                "
                        + "  	 pt                                                    "
                        + "    FROM mall_events_commondim_kfksource                    "
                        + "    WHERE event_id in ('video_play','video_hb',             "
                        + "      'video_pause','video_resume','video_stop')            "
                        + "  )                                                         "
                        + "  SELECT                                                    "
                        + "      user_id,                                              "
                        + "      event_id,                                             "
                        + "      event_time,                                           "
                        + "      release_channel,                                      "
                        + "      device_type,                                          "
                        + "      video_id,                                             "
                        + "      video_play_id,                                              "
                        + "  	 video_name,                                           "
                        + "  	 video_type,                                           "
                        + "  	 video_album,                                          "
                        + "      video_author,                                         "
                        + "  	 video_timelong,                                       "
                        + "  	 create_time ,                                         "
                        + "  	 update_time ,                                         "
                        + "  	 0 as play_start_time ,                                "
                        + "  	 0 as play_end_time                                    "
                        + "  FROM play_log p                                           "
                        + "  left join cms_video_hbase FOR SYSTEM_TIME AS OF p.pt AS c "
                        + "  ON p.video_id = c.id                                      "
        );

        // 核心逻辑处理
        DataStream<VideoAggBean> ds = tEnv.toDataStream(tEnv.from("tmp"), VideoAggBean.class);  // 视图名 转表对象，再转流

        SingleOutputStreamOperator<VideoAggBean> resultStream =
                ds.keyBy(bean -> Tuple2.of(bean.getUser_id(), bean.getVideo_play_id()), TypeInformation.of(new TypeHint<Tuple2<Integer, String>>() {}))
                .process(new KeyedProcessFunction<Tuple2<Integer, String>, VideoAggBean, VideoAggBean>() {
                    ValueState<VideoAggBean> state;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        state = getRuntimeContext().getState(new ValueStateDescriptor<VideoAggBean>("va", VideoAggBean.class));
                    }

                    @Override
                    public void processElement(VideoAggBean bean, KeyedProcessFunction<Tuple2<Integer, String>, VideoAggBean, VideoAggBean>.Context ctx, Collector<VideoAggBean> out) throws Exception {
                        String eventId = bean.getEvent_id();
                        // 判断event_id 是否为  播放事件
                        if ("video_play".equals(eventId) || "video_resume".equals(eventId) || state.value() == null) {
                            bean.setPlay_start_time(bean.getEvent_time());
                            state.update(bean);
                        }

                        VideoAggBean stateBean = state.value();
                        stateBean.setPlay_end_time(bean.getEvent_time());
                        state.update(stateBean);

                        // 输出状态中更新后的数据
                        out.collect(state.value());
                    }
                });

        // 将处理好的结果流，注册成临时视图
        tEnv.createTemporaryView("resView",resultStream);

        // 创建 doris目标表的  逻辑映射表
        tEnv.executeSql(
                " create table video_agg_01_doris(   "
                        + "     start_dt         DATE,         "
                        + "     user_id          INT,          "
                        + "     release_channel  VARCHAR(20),  "
                        + "     device_type      VARCHAR(20),  "
                        + "     video_id         INT ,         "
                        + "     video_play_id    VARCHAR(20),  "
                        + "     video_name       VARCHAR(80),  "
                        + "     video_type       VARCHAR(20),  "
                        + "     video_album      VARCHAR(40),  "
                        + "     video_author     VARCHAR(40),  "
                        + "     video_timelong   BIGINT,       "
                        + "     create_time      TIMESTAMP,    "
                        + "     play_start_time  BIGINT ,      "
                        + "     play_end_time    BIGINT        "
                        + " ) WITH (                           "
                        + "    'connector' = 'doris',          "
                        + "    'fenodes' = 'doitedu:8030',     "
                        + "    'table.identifier' = 'dws.video_agg_01',  "
                        + "    'username' = 'root',            "
                        + "    'password' = '',                "
                        + "    'sink.label-prefix' = 'doris_tl" + System.currentTimeMillis() + "')"
        );

        // 执行一个insert语句
        tEnv.executeSql(
                " insert into video_agg_01_doris  "
                        +" select                          "
                        +"   to_date(date_format(to_timestamp_ltz(play_start_time,3),'yyyy-MM-dd'))  as start_dt "
                        +"   ,user_id              "
                        +"   ,release_channel      "
                        +"   ,device_type          "
                        +"   ,video_id             "
                        +"   ,video_play_id        "
                        +"   ,video_name           "
                        +"   ,video_type           "
                        +"   ,video_album          "
                        +"   ,video_author         "
                        +"   ,video_timelong       "
                        +"   ,create_time          "
                        +"   ,play_start_time      "
                        +"   ,play_end_time        "
                        +" from resView            "
        );

        env.execute();
    }


    @NoArgsConstructor
    @AllArgsConstructor
    @Data
    public static class VideoAggBean {
        int user_id;
        String event_id;
        long event_time;
        String release_channel;
        String device_type;
        int video_id;
        String video_play_id;
        String video_name;
        String video_type;
        String video_album;
        String video_author;
        int video_timelong;
        Timestamp create_time;
        Timestamp update_time;
        long play_start_time;
        long play_end_time;
    }
}
